Skip to content

fix(kafka): strip kafka config from Consumer.consume() args#432

Merged
CarlosGamero merged 1 commit intokibertoad:mainfrom
irfanh94:fix_strip_kafka_config_from_consumer_consume
May 8, 2026
Merged

fix(kafka): strip kafka config from Consumer.consume() args#432
CarlosGamero merged 1 commit intokibertoad:mainfrom
irfanh94:fix_strip_kafka_config_from_consumer_consume

Conversation

@irfanh94
Copy link
Copy Markdown
Contributor

@irfanh94 irfanh94 commented May 8, 2026

Problem

AbstractKafkaConsumer.init() forwards the entire stored options — including the connection-level kafka config — into @platformatic/kafka's Consumer.consume(). Inside consume(), the MessagesStream constructor runs structuredClone(otherOptions) on whatever it doesn't explicitly destructure. Anything function-valued reachable from the kafka config crashes that clone with DOMException [DataCloneError], taking down consumer init.

The most common trigger is AWS MSK IAM authentication, which uses SASL/OAUTHBEARER with an async token provider:

sasl: {
  mechanism: 'OAUTHBEARER',
  token: async () => (await generateAuthToken({ region })).token,
}

Production stack trace:

DOMException [DataCloneError]: async () => { ... } could not be cloned.
    at structuredClone (node:internal/worker/js_transferable:126:26)
    at new MessagesStream (.../@platformatic/kafka/dist/clients/consumer/messages-stream.js:142:25)
    at #performConsume (.../@platformatic/kafka/dist/clients/consumer/consumer.js:942:24)
    ...
    at AbstractKafkaConsumer.init (.../@message-queue-toolkit/kafka/dist/AbstractKafkaConsumer.js:112:19)

The same class of bug would hit any function-valued field passed through SASL/OAUTHBEARER providers (Confluent Cloud OIDC, etc.).

Fix

Exclude kafka from the spread that builds consumeOptions in init():

- const { handlers: _, ...consumeOptions } = this.options
+ const { handlers: _, kafka: __, ...consumeOptions } = this.options

kafka is connection-level config — it belongs on the Consumer constructor (where it's already spread separately), not on consume(). Stripping it makes the consume args structuredClone-safe regardless of what's inside the user's connection config.

Tests

Two regression tests in PermissionConsumer.spec.ts that spy on Consumer.prototype.consume:

  • strips 'kafka' from consume options — asserts the kafka field is absent from the args.
  • keeps 'kafka' out of consume options even when it carries a function-valued field — injects a function into the kafka config (simulating AWS MSK IAM sasl.token) and asserts it never reaches consume().
    Verified both fail on the unfixed code — the second test reproduces the exact production stack trace — and pass with the fix applied.

Summary by CodeRabbit

  • Bug Fixes

    • Fixed Kafka consumer configuration handling to prevent serialization errors with authentication token providers.
  • Tests

    • Added regression test for Kafka consumer configuration validation.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 8, 2026

Review Change Stack
No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2bf89db6-beb2-432e-8c23-182ab4acb105

📥 Commits

Reviewing files that changed from the base of the PR and between 8f1c31b and 38d3a6b.

📒 Files selected for processing (2)
  • packages/kafka/lib/AbstractKafkaConsumer.ts
  • packages/kafka/test/consumer/PermissionConsumer.spec.ts

📝 Walkthrough

Walkthrough

AbstractKafkaConsumer.init() now excludes the kafka option object from options forwarded to consumer.consume(). This prevents function-valued connection-level options such as SASL/OAUTH token providers from reaching MessagesStream's structured cloning, which would otherwise trigger a DataCloneError. A regression test verifies the fix.

Changes

Kafka Consumer Option Filtering

Layer / File(s) Summary
Implementation Fix
packages/kafka/lib/AbstractKafkaConsumer.ts
The init() method now destructures options to exclude both handlers and kafka, ensuring only consume-relevant settings are forwarded. Comments explain why connection-level options with function values must not reach MessagesStream.
Test Coverage
packages/kafka/test/consumer/PermissionConsumer.spec.ts
Added regression test that spies on Consumer.consume() to verify the kafka option is stripped from consumed options, including when kafka contains function-valued fields like token providers.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Possibly related PRs

  • kibertoad/message-queue-toolkit#419: Both PRs modify AbstractKafkaConsumer's init/consume logic; this PR strips the kafka option while the related PR refactors init/consumer construction for reconnect behavior.
  • kibertoad/message-queue-toolkit#393: Both PRs modify AbstractKafkaConsumer.init() in packages/kafka/lib/AbstractKafkaConsumer.ts, changing how consume() is invoked and handled.
  • kibertoad/message-queue-toolkit#410: Both PRs modify AbstractKafkaConsumer.ts including changes to init and consume wiring; this PR strips the kafka option while the related PR refactors init/consume streaming and batch handling.

Suggested labels

patch

Suggested reviewers

  • kibertoad
  • CarlosGamero
  • kjamrog

Poem

🐰 A hop through option flows so free,
The kafka field stripped with clarity,
No clones of functions cause dismay,
Token providers find their way! 🎉

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and concisely describes the main change: excluding the kafka config object from Consumer.consume() arguments to prevent DataCloneError crashes.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@CarlosGamero CarlosGamero merged commit 2827e7e into kibertoad:main May 8, 2026
7 of 8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants